說明:僅記錄nodejs如何消費,如何生產並未做記錄,因為需求沒有用到。
開發環境
操作系統:windows10/windows11
開發工具:Visual Studio Code
Electron:vue-electron 1.0.6
NodeJS:16.14.2
RabbitMQ:3.8.1
說明:我是在KubeSphere的應用商店中安裝的RabbitMQ的。在哪安裝不重要,只要你安裝了RabbitMQ就行。關鍵是你要獲取到RabbitMQ的端口、賬號、密碼就行,后面會用到。
安裝amqplib
NodeJS如果要和RabbitMQ通信,amqplib是最好的選擇。npm安裝amqplib:
npm i amqplib
封裝RabbitMQ類
對使用RabbitMQ的代碼進行封裝,方便使用。
以下是關於導入amqplib包的介紹:
如果你准備使用Promise的方式處理連接、創建通道,那么你需要這么導入包: var amqp = require('amqplib'); 否則這么導入: var amqp = require('amqplib/callback_api'); 我沒有用Promise,所以使用的第二種方式。因為我在做斷線重連的時候,使用Promise方式沒整成功,所以就沒用Promise了。
咱做技術的肯定要方方面面都要考慮到,才能保證系統的穩定和高可用。很多網上資料都是簡單的介紹了下使用方法,斷線重連都沒介紹。所以基於socket的通信,斷線重連和粘包都是必須要考慮的。扯遠了。。。
下面直接貼代碼吧:
/* eslint-disable */ var amqp = require('amqplib/callback_api') /** * 封裝RabbitMQ的操作類 */ class RabbitMQ { // 構造方法 constructor(options) { if (!options) { this.rabbitmqSettings = { protocol: 'amqp', hostname: '192.168.3.101', port: 31802, username: 'admin', password: 'password' }; } this.connection = undefined; } // 接收信息 receiveQueueMsg(exchangeName, queueName, routingKey, callBack) { amqp.connect(this.rabbitmqSettings, (err, conn) => { console.log('開始創建連接'); if (err) { console.log('創建連接失敗 ', err) setTimeout(() => { this.receiveQueueMsg(exchangeName, queueName, routingKey, callBack); }, 3000); return; } console.log('創建連接成功'); this.connection = conn; conn.on('error', (err) => { console.log('connect_error ' + err.message, err) setTimeout(() => { this.receiveQueueMsg(exchangeName, queueName, routingKey, callBack); }, 3000); }) conn.createChannel(function (err, channel) { console.log("創建通道", err); channel.deleteQueue(queueName); // 接收端 channel.assertQueue(queueName, { durable: true, exclusive: false, autoDelete: false }, false); channel.bindQueue(queueName, exchangeName, routingKey); // 接收端,ack表示通知RabbitMQ確認收到 channel.consume(queueName, function (msg) { callBack && callBack(msg); channel.ack(msg); }, { noAck: false }); }); }); } //關閉連接 close() { this.connection.close(); console.log("頁面關閉,連接斷開。"); } } export default RabbitMQ
使用RabbitMQ封裝類
1.導入封裝的RabbitMQ類
import RabbitMQ from '@/utils/rabbitmq'
2.使用封裝的RabbitMQ類
export default { data () { return { mq: undefined } }, methods: { close() { this.mq.close(); }, open() { this.mq = new RabbitMQ(); this.mq.receiveQueueMsg('direct_exchange', 'queue_name_test_3', "message", (msg) => { if(msg !== null) { console.log(msg.content.toString()); } }); } } }
頁面上有2個按鈕,一個是打開,一個是關閉,這里就不貼代碼了。
代碼寫的有些瑕疵啊,比如在關閉mq之前,先判斷下mq是否存在,如果存在才能關閉。讀者自己可以補充。
學習資料,務必要看
重要!重要!重要!
重要的事情說三遍!!!https://amqp-node.github.io/amqplib/channel_api.html這個地址非常非常的重要,務必要收藏。里面有nodejs使用rabbitmq的各種API。是百度不到的,還是俺VPN翻到的。
代碼介紹
上面只是貼了代碼,但沒對代碼做解釋,不夠完美,下面補上。
構造方法
RabbitMQ封裝類有個構造方法:constructor,它有個參數:options。也就是說創建這個類的實例的時候,需要傳入options。它是一個對象,如果沒傳,那么在構造方法里面重新定義這個對象。主要包含:通信協議、IP地址、端口、rabbitmq的用戶名和密碼。我准備后期從配置信息讀取這些值。做成可配置的。
構造方法里面還有個connection字段,它是用來存放與rabbitmq連接對象的。主動關閉與rabbitmq的連接時會用到。
接收消息方法
先說需求:每個消費者都要完整消費數據。什么意思呢?比如生產者產生10條數據,每個消費者都要消費這10條。而不是將數據平均到每個消費者消費。
方法receiveQueueMsg(exchangeName, queueName, routingKey, callBack),有四個參數:
1.exchangeName:交換機名稱,rabbitmq使用交換機“分發”或者“路由”生產者產生的數據到不同的通道。生產者和消費者的exchangeName保持一致,且生產時聲明的exchange類型必須為:direct。
2.queueName:通道名稱,每個消費者的queueName可以相同,也可以不相同。但是我的需求就要求每個queueName必須不同。否則無法完整消費數據。
3.routingKey:第1點有提到交換機“分發”生產者產生的數據到不同的通道,那么這一點rabbitmq是怎么做到的呢?就是通過routingKey綁定exchangeName和queueName的關系來實現的。綁定之后一個routingKey對應一個exchangeName和多個queueName。我們在生產和消費的時候指定routingKey就可以了。
4.callBack:這個好理解,收到rabbitmq消息后的回調方法。
amqp.connect()
用來連接rabbitmq。它的第一個參數既可以是一個url,也可以是一個對象。比如:
amqp.connect('amqp://admin:password@192.168.3.101:31802', function (err, conn) {});
或者
amqp.connect(raabitmqSettings, function (err, conn) {});
conn.createChannel()
rabbitmq連接成功后,用來創建消息通道。
channel.deleteQueue(queueName)
創建通道之后,根據消息隊列名稱刪除之前的隊列,需求就這樣。
假如客戶端一開始是關着的,消息隊列中產生了很多數據,如果不刪除隊列,當我打開客戶端時我會獲取很多垃圾數據。而最新的一條數據才是我需要的。
channel.assertQueue()
用來聲明或者說是定義消息隊列。
channel.assertQueue(queueName, { durable: true, exclusive: false, autoDelete: false }, false);
第一個參數queueName:表示消息隊列名稱。
第二個參數是個對象:durable表示隊列是否持久化,注意不是消息是隊列;exclusive表示是否排他,如果為true別的連接(connection)看不到它;autoDelete如果為true,當消費者數量降至零時,隊列將被刪除(默認為false)。
channel.bindQueue()
用來綁定交換機、消息隊列、路由,前面也有提到
channel.bindQueue(queueName, exchange, routingKey);
第一個參數queueName:表示消息隊列名稱
第二個參數exchange:表示交換機名稱
第三個參數routingKey:表示路由名稱
關閉連接方法
close()
關閉連接方法處也有瑕疵,需要先判斷連接對象是否存在,如果存在才允許關閉。后面讀者可以自己修改完善。
斷線重連
主要是在“首次連接失敗”和“連接斷開”時,3秒后重新發起連接。
其它
類文件頂部有/* eslint-disable */這么一行字符,目的是告訴編譯器忽略eslint檢查。
生產端我用的是.net core操作rabbitmq的,demo可以移步到碼雲:https://gitee.com/subendong/RabbitMQ.Test。這個demo里的producer的配置有幾點需要注意:
1. 無需創建隊列
2. 無需綁定交換機、路由、隊列,由消費者綁定
3. 只需定義交換機,指定交換機類型為:direct