一、背景:
我司的系統,用戶可以創建任務,啟動任務,但任務的運行需要很長的時間,所以采用消息隊列的方式,后台異步處理。
這里所用到的是 RabbitMQ
,對應的 Node.js 庫為 amqplib
( 這里采用的是回調形式:require("amqplib/callback_api") )。
二、MQ 處理任務的流程
① ② ③ ④ ⑤ :從前端發來 HTTP 請求,被 Producer(express) 處理,經過 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),發送需要處理的任務的 uuid 入 MQ 隊列。這時候,還要修改數據庫,把該任務的狀態從 "new" -> "queue"。
⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 隊列吐出來的 message,即任務的 uuid。先修改數據庫該任務的狀態為"runnning", 然后調用"處理"模塊去執行復雜運算,執行完成后,修改數據庫該任務的狀態是 "success" 還是 ”fail”,然后返回 ack 信號給 MQ 。
注:這次的需求比較簡單,所以沒有用到 MQ 的交換機功能。
三、問與答
Q:如何做好 MQ 的錯誤處理?
MQ 的 connection 和 channel 對象都有 "error" 和 "close" 事件,需做好相關的日志記錄。尤其是 "error",要加上 reconnect 機制,防止因為某個任務導致的錯誤或者 MQ 自身的原因,影響到后續任務的處理。
connection.on("error", function(err) {
// reconnect
});
channel.on("error", function(err) {
// reconnect
});
最后可以根據實際需要,在全局加上 try……catch。
Q:如何保證 MQ 自身消息的數據安全?
為了防止 MQ server 的崩潰導致的消息損失,需要對數據做持久化。大致分兩塊:
隊列持久化 + 消息持久化
channel.assertQueue(queue_name, {
durable: true
// 隊列持久化
});
channel.sendToQueue(
queue_name,
Buffer.from(uuid),
{
persistent: true
// 消息持久化
},
function(err, ok) {
}
);
Q:如何保證 DB 跟 MQ 數據的一致性?
1、發送 message 時
④ 中的 sendToQueue() ,需要在 createConfirmChannel() 的基礎下使用,這樣 sendToQueue() 的第三個參數才有 MQ 收到 message 成功與否的回調,根據這個,去結合 ② 的 DB 操作, 綁定為事務,來保證數據的一致性。
2、接受 message 時
channel.consume() 需開啟 ack 模式,等 Consumer 端一切確認完成后,再通知 MQ 。
channel.consume(
queue_name,
function(msg) {
const uuid = msg.content.toString();
// use uuid todo……
},
{
noAck: false
}
);
Q:如何避免 MQ 多發、少發的問題
從上面的 如何保證 DB 跟 MQ 數據的一致性? 其實就避免了該問題的發生。
但是額外要做的是:
1、重試機制,例如 發送 message 失敗,規定重試的次數。
2、善用 MQ 的 Web 控制台,地址形如 http://localhost:15672。除了關注基本的服務器負載狀態,還要關注任務隊列是否正常吞吐,是否有卡殼。
3、構建運維一體的后台管控系統,比上面的 2 自定義程度更高。
4、提供用戶類似"提交工單"/"問題反饋"/"錯誤上傳"的功能,查缺補漏。