使用NODEJS+REDIS開發一個消息隊列以及定時任務處理


作者:RobanLee

原創文章,轉載請注明: 蘿卜李 http://www.robanlee.com

源碼在這里: https://github.com/robanlee123/RobCron




時間有限,就不詳細注釋,有問題或者意見歡迎@我,也歡迎大家批評指正.

本文所必須的一些資料如下:

1. NODEJS ==> 可以去NODEJS.ORG下載最新的源碼.
2. Redis ==> Redis.io
3. KUE ==> Nodejs的一個開源隊列系統
4. NODE-SCHEDULE ==> NODEJS 一個開源調度系統

廢話不多說,先來介紹任務的流程:

1. NODEJS或者PHP或者其他語言 寫入REDIS 一個計划任務, 比如每分鍾做某件事,這里就用SAYHELLO來代替好了
2. 使用NODEJS讀取這個任務,並將它轉換為NODE的調度任務(node-schedule 來完成)
3. 調度器[node-schedule]根據設定的規則來分發任務.
4. KUE接受任務,並且加入列隊,執行.
5. DONE

 

STEP1: 創建一個任務

/**
 * Add task
 * @author Robanlee@gmail.com
 */

//加載函數,集中加載一些LIB,這個源碼請參照最后的附屬文件
var loader = require('./loader');  

function addTask(opts){
        new loader(this); 
        
        //默認設置
        this.opts = {
                keyIDs:'schedule:job:ids',
                keyLists:'schedule:job:list',
                keyJob:'schedule:job:'
        }
        
        //合並配置,類似JQUERY: extend
        this.mergeParams(opts);  
};

//Merge options
addTask.prototype.mergeParams = function ( param ){
        if(undefined === this.opts ) {
                return false;
        }
        
        for(var x in param) { 
                if(param[x] != undefined && '' != param[x]) {
                        this.opts[x] = param[x];
                }
        }
};

//添加數據方法
addTask.prototype.pushData = function ( data ){ 
        if(undefined == data ) {
                console.log('--ERROR:data is null');
                return false;
        }
        this.getIncr.call(this,function(response,obj){
                var id = response;
                obj.redisClient.rpush(obj.opts.keyLists,id,function(err,response){
                        if(err) throw err;
                });
         
                data.id = id;
                var m = obj.redisClient.multi();
                for(var x in data) {
                        m.hset( obj.opts.keyJob+id,x,data[x] );
                }
                
                m.exec(function(err,response){
                        if(err) throw err; 
                        console.log('[info] Task: ['+data.name+'] has been set successful!');
                });   
               
        }); 
};

//獲取REDIS目前的自增ID
addTask.prototype.getIncr = function (callBack){
        var obj = this; 
        this.redisClient.incr(this.opts.keyIDs,function(err,response){
                console.log("[info] Current id is : " + response);
                callBack(response, obj);
        });
};

 

 

加載這個lib 寫入一個DEMO:

var data = { 
        'name':'taskDemo',
        'created':Date.now(),
        'state':1,
        'type':'untitled',
        'rule':'*/1 * * * *'  //這個任務規則可以為CRONTAB的規則,這個表示每分鍾執行一次
};

var job = new addTask();
job.pushData(data);

 

執行這個腳本,如果一切正常,你會看到如下信息:

NODEJS 輸出:

REDIS:

 

 

接下來就是獲取數據,並且轉換為調度任務了,

源碼:

var loader = require('./loader');
var taskLog = require("./TaskLog");

function scheduleTask(){
        new loader(this); 
        this.opts = {
                keyIDs:'schedule:job:ids',
                keyLists:'schedule:job:list',
                keyJob:'schedule:job:'
        }
        
        this.task = {
                taskDemo:undefined
        };
        
        //監聽取消任務操作
        this.listenCancel();
};

scheduleTask.prototype.setScheduleTask = function (data,obj){ 
        this.task[data.name] =  this.libs['node-schedule'].scheduleJob(data.rule,function(){
                obj.setQueue(data);
                console.log('[info] Task :' + data.name + ' has been set in queue!');
        });
         
};

scheduleTask.prototype.setQueue = function (datas){
        
        var jobs = this.libs.kue.createQueue(); 
        jobs.create(datas.name,{
                'name:':datas.name,
                'state':1
        }).save();
         
        console.log("[info] Task ["+datas.name+"] has been queued!");
        
        this.setLog(datas);
};

scheduleTask.prototype.setLog = function (responseData){
        var logData = {
                jobid:responseData.id,
                name:responseData.name,
                result:1
        };
                                        
        new taskLog(logData);
        console.log("[info] Task has been loged");
};


scheduleTask.prototype.getJob = function (){
        this.getJobIndex.call(this,function(response,obj){
                for(var x in response ) {
                        obj.redisClient.hgetall(obj.opts.keyJob+response[x],function(err,data){
                                console.log("[info] Task:["+data.name+"] has been loaded!");
                                obj.setScheduleTask(data, obj);
                        });
                }
        });
};


scheduleTask.prototype.getJobIndex = function(callBack){
        //Read tasks from <list schedule:job:list>
        var o = this;
        this.redisClient.lrange(this.opts.keyLists,0,-1,function(err,response){
                if(err) throw err;
                callBack(response, o);
        });
};

scheduleTask.prototype.listenCancel = function (){
        var job = this.libs.kue.createQueue();
        var that = this;

        job.process('cancelJob',function(data,done){  
                that.task[data.data.data].cancel();
                console.log('[info] Task: '+data.data.data + ' has been canceled') ;
                done();
        });
}

 

執行代碼:

var x = new scheduleTask();
x.getJob();

 

等待一分鍾后,NODEJS控制台會輸出(這個任務在沒有取消的情況下,每分鍾都會執行):

 

第二分鍾:

 

REDIS 現在的數據:

這個數據中增加了KUE的一些任務, q:job:[]:inactive 這個就標識任務還未被執行,執行后的任務狀態有

complete active failed delay 四種

 

至此,就只剩下執行任務的步驟了

var loader = require('./loader');

function execTask(){
        new loader(this); 
        
        var job = this.libs.kue.createQueue();
        job.process('taskDemo',function(data,done){
                console.log('[info] Task:'+data.type+'#'+data.id+' has been executed successful!');
                
                
                //DONE之前可以做你想要做的事情
                
                done(); //千萬別忘記調用此方法
        });
        
        
}

//添加一個取消定時任務的KUE任務
execTask.prototype.addCancelJob = function (){
        var job =this.libs.kue.createQueue();
        job.create('cancelJob', {data:'taskDemo'}).save();
        console.log('[info] Task: cancelJob has been send!');
}

 

執行這個腳本:

var et = new execTask();

//取消定時任務
et.addCancelJob();

 

 

執行后會有2個結果

1. 程序會執行當前列隊里的任務.

2. 定時任務會被取消,下一分鍾后任務不會再由SCHEDULE分配

 

任務執行結果:

 

取消任務的回應:

注意最后一行…


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM