nodejs bull 實現延時隊列


bull.js

const Queue = require('bull');
const queue = new Queue('nike', {
  redis: {
    port: 6379,
    host: '127.0.0.1',
    db: 3,
    password: null
  },
  prefix: 'nike_',
  defaultJobOptions: {
    attempts: 1,
    removeOnComplete: true,
    backoff: false,
    delay: 0
  },
  limiter: {
    max: 200000,
    duration: 1000
  },
  settings: {
    maxStalledCount: 1,
    guardInterval: 1, // 重新調度延遲
    retryProcessDelay: 500, // delay before processing next job in case of internal error.
  // drainDelay: 50000 // 空隊列時brpoplpush的等待時間
  }
});
module.exports = queue;

 

生產者

const queue = require('./bull');
const random = require("random-string")
var log4js = require("log4js");
var logger = log4js.getLogger();
logger.level = "info";
queue.on('global:progress', function(jobId, progress) {
  logger.info(`Job ${jobId} is ${progress * 100}% ready!`);
});
queue.on('global:completed', jobId => {
  logger.info(`Job with id ${jobId} has been completed`);
})
const  main = async () => {
  for(let i=0;i<10;i++){
    const job = await queue.add({
      key: random(10)
    },{
      delay:5000
    });
    logger.info("生產者:",job.data,await queue.count())
  }
}
main()

消費者

const queue = require('./bull');
var log4js = require("log4js");
var logger = log4js.getLogger();
logger.level = "info";
const  main = async () => {
  queue.process(async (job) => {
    logger.info('消費者:',job.data);
     await new Promise(r => setTimeout(r,1000))
    return Promise.resolve();
  });
}
main()

 

 

 

 

 

 

 文檔:

https://github.com/OptimalBits/bull/tree/develop/docs

api:

https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueprocess


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM