對於amqplib的使用心得


     最近在nodejs使用了amqplib--rabbitmq的nodejs客戶端。封裝在了express中,先來代碼。

   

 1 var amqp = require('amqplib/callback_api');
 2 var config=require('../config/config');
 3 var log=require('../util/loghelp');
 4 function fail(err, conn) {
 5     log.error(err);
 6     if (conn) conn.close();
 7 }
 8 exports.StartConsumer=function (action,qname) {
 9     function on_connect(err, conn) {
10         if (err !== null) return fail(err);
11         function on_channel_open(err, ch) {
12             ch.assertQueue(qname, {durable: true}, function(err, ok) {
13                 if (err !== null) return bail(err, conn);
14                 ch.consume(qname, function(msg) {
15 
16                     log.info(`Received ${msg.content.toString()},start process`);
17                     action(JSON.parse(msg.content))
18                         .then(d=> {
19                             log.info("mq 處理成功,確認");ch.ack(msg)
20                         }
21                            )
22                         .catch(err=>
23                         ch.nack(msg));
24                 }, {noAck: false} );
25             });
26         }
27         conn.createChannel(on_channel_open);
28     }
29     amqp.connect(config.amqp.url,on_connect);
30 };
31 
32 exports.enqueue=function (data,qname) {
33     function on_connect(err, conn) {
34         if (err !== null) return bail(err);
35 
36         function on_channel_open(err, ch) {
37             if (err !== null) return bail(err, conn);
38             ch.assertQueue(qname, {durable: true}, function(err, ok) {
39                 if (err !== null) return bail(err, conn);
40                 var msg=JSON.stringify(data);
41                 ch.sendToQueue(qname, new Buffer(msg));
42                 log.info(`mq send ${msg}`);
43                 ch.close(function() { conn.close(); });
44             });
45         }
46         conn.createChannel(on_channel_open);
47     }
48     amqp.connect(config.amqp.url,on_connect);
49 };

    其中StartConsumer 會在項目啟動時啟動,在整個生命周期中一直保持監聽狀態,在程序結束時mq的鏈接關閉。需要注意的是 noAck 這個參數,當為false是表示消息出隊后不會自動刪除,如果設置成true,則無論消息處理成功與否此消息會被刪除。注意到在消息不成功是,調用了ch.nack(msg)),此方法是將消息重新入隊。

   而enqueue 則是消息入隊列后連接立刻關閉,以免占用資源。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM