rabbitmq 延時隊列實現定時任務


場景

實際業務中對於定時任務的需求是不可避免的,例如,訂單超時自動取消、每天定時拉取數據等,在Node.js中系統層面提供了setTimeout、setInterval兩個API或通過node-schedule這種第三方庫來實現。通過這種方式實現對於簡單的定時任務是ok的,過於復雜的、可用性要求較高的系統就會存在以下缺點。

存在的一些問題

  • 消耗系統內存,如果定時任務很多,長時間得不到釋放,將會一直占用系統進程耗費內存。
  • 單線程如何保障出現系統崩潰后之前的定時任務不受影響?多進程集群模式下一致性的保證?
  • setTimeout、setInterval會存在時間誤差,對於時間精度要求較高的是不行的。

RabbitMQ TTL+DLX 實現定時任務

RabbitMQ本身是不支持的,可以通過它提供的兩個特性 Time-To-Live and ExpirationDead Letter Exchanges來實現,通過以下泳道圖可以看到一個消息從發布到消費的整個過程。

死信隊列

死信隊列全稱 Dead-Letter-Exchange 簡稱 DLX 是 RabbitMQ 中交換器的一種類型,消息在一段時間之后沒有被消費就會變成死信被重新 publish 到另一個 DLX 交換器隊列中,因此稱為死信隊列。
  • 死信隊列產生幾種情況

    • 消息被拒絕
    • 消息TTL過期
    • 隊列達到最大長度
  • 設置DLX的兩個參數:

    • deadLetterExchange: 設置DLX,當正常隊列的消息成為死信后會被路由到DLX中
    • deadLetterRoutingKey: 設置DLX指定的路由鍵

注意:Dead-Letter-Exchange也是一種普通的Exchange

消息TTL

消息的TTL指的是消息的存活時間,RabbitMQ支持消息、隊列兩種方式設置TTL,分別如下:

  • 消息設置TTL:對消息的設置是在發送時進行TTL設置,通過x-message-ttlexpiration 字段設置,單位為毫秒,代表消息的過期時間,每條消息的TTL可不同。

  • 隊列設置TTL:對隊列的設置是在消息入隊列時計算,通過 x-expires 設置,隊列中的所有消息都有相同的過期時間,當超過了隊列的超時設置,消息會自動的清除。

注意:如果以上兩種方式都做了設置,消息的TTL則以兩者之中最小的那個為准。

Nodejs操作RabbitMQ實現延遲隊列

推薦采用 amqplib庫,一個Node.js實現的RabbitMQ客戶端。

初始化RabbitMQ

const amqp = require('amqplib');
const log4js = require('log4js');
const logger = log4js.getLogger();
logger.level = 'info';
module.exports = {
  logger,
  init: () =>
    amqp.connect('amqp://122.51.9.11:5672').then((connection) => {
      logger.info('rabbitmq connect success');
      return connection;
    }),
};

生產者

const random = require('string-random');
const rabbitmq = require('./rabbitmq.js');
const logger = rabbitmq.logger;
const sleep = (time) => new Promise((resolve) => setTimeout(resolve, time));
async function producerDLX(connnection) {
  const testQueue = 'testQu';
  const testExchange = 'testEx';
  const testRoutingKey = 'testRoutingKey';
  const testExchangeDLX = 'testExDLX';
  const testRoutingKeyDLX = 'testRoutingKeyDLX';
  const msg = 'Producer';

  const ch = await connnection.createChannel();
  await ch.assertExchange(testExchange, 'direct', { durable: true });
  const queueResult = await ch.assertQueue(testQueue, {
    exclusive: false,
    messageTtl: 10000,
    deadLetterExchange: testExchangeDLX,
    deadLetterRoutingKey: testRoutingKeyDLX,
  });
  await ch.bindQueue(queueResult.queue, testExchange, testRoutingKey);

  for (let i = 0; i < 5; i++) {
    await sleep(2000);
    const cMsg = `${i}:${msg} 消息 =>${random(10)}`;
    logger.info(cMsg);
    await ch.publish(testExchange, testRoutingKey, Buffer.from(cMsg));
  }

  await ch.close();
}

// 發送消息
rabbitmq.init().then((connection) => producerDLX(connection));

 消費者

const rabbitmq = require('./rabbitmq.js');
const logger = rabbitmq.logger;
/**
 * 消費一個死信隊列
 * @param { Object } connnection
 */
async function consumerDLX(connnection) {
  const testExchangeDLX = 'testExDLX';
  const testRoutingKeyDLX = 'testRoutingKeyDLX';
  const testQueueDLX = 'testQueueDLX';

  const ch = await connnection.createChannel();

  await ch.assertExchange(testExchangeDLX, 'direct', { durable: true });
  const queueResult = await ch.assertQueue(testQueueDLX, {
    exclusive: false,
  });
  await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX);
  await ch.consume(
    queueResult.queue,
    (msg) => {
      logger.info('consumer msg:', msg.content.toString());
    },
    { noAck: true }
  );
}

// 消費消息
rabbitmq.init().then((connection) => consumerDLX(connection));

分別執行消費者和生產者,可以看到 producer 在45秒發布了消息,consumer 是在55秒接收到的消息,實現了定時10秒種執行

 

 參考

https://www.jianshu.com/p/9ce0223aeb5e


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM