最近在nodejs使用了amqplib--rabbitmq的nodejs客戶端。封裝在了express中,先來代碼。
1 var amqp = require('amqplib/callback_api'); 2 var config=require('../config/config'); 3 var log=require('../util/loghelp'); 4 function fail(err, conn) { 5 log.error(err); 6 if (conn) conn.close(); 7 } 8 exports.StartConsumer=function (action,qname) { 9 function on_connect(err, conn) { 10 if (err !== null) return fail(err); 11 function on_channel_open(err, ch) { 12 ch.assertQueue(qname, {durable: true}, function(err, ok) { 13 if (err !== null) return bail(err, conn); 14 ch.consume(qname, function(msg) { 15 16 log.info(`Received ${msg.content.toString()},start process`); 17 action(JSON.parse(msg.content)) 18 .then(d=> { 19 log.info("mq 處理成功,確認");ch.ack(msg) 20 } 21 ) 22 .catch(err=> 23 ch.nack(msg)); 24 }, {noAck: false} ); 25 }); 26 } 27 conn.createChannel(on_channel_open); 28 } 29 amqp.connect(config.amqp.url,on_connect); 30 }; 31 32 exports.enqueue=function (data,qname) { 33 function on_connect(err, conn) { 34 if (err !== null) return bail(err); 35 36 function on_channel_open(err, ch) { 37 if (err !== null) return bail(err, conn); 38 ch.assertQueue(qname, {durable: true}, function(err, ok) { 39 if (err !== null) return bail(err, conn); 40 var msg=JSON.stringify(data); 41 ch.sendToQueue(qname, new Buffer(msg)); 42 log.info(`mq send ${msg}`); 43 ch.close(function() { conn.close(); }); 44 }); 45 } 46 conn.createChannel(on_channel_open); 47 } 48 amqp.connect(config.amqp.url,on_connect); 49 };
其中StartConsumer 會在項目啟動時啟動,在整個生命周期中一直保持監聽狀態,在程序結束時mq的鏈接關閉。需要注意的是 noAck 這個參數,當為false是表示消息出隊后不會自動刪除,如果設置成true,則無論消息處理成功與否此消息會被刪除。注意到在消息不成功是,調用了ch.nack(msg)),此方法是將消息重新入隊。
而enqueue 則是消息入隊列后連接立刻關閉,以免占用資源。
