Electron NodeJS 訂閱和消費RabbitMQ詳細筆記


說明:僅記錄nodejs如何消費,如何生產並未做記錄,因為需求沒有用到。

開發環境

操作系統:windows10/windows11

開發工具:Visual Studio Code

Electron:vue-electron 1.0.6

NodeJS:16.14.2

RabbitMQ:3.8.1

說明:我是在KubeSphere的應用商店中安裝的RabbitMQ的。在哪安裝不重要,只要你安裝了RabbitMQ就行。關鍵是你要獲取到RabbitMQ的端口、賬號、密碼就行,后面會用到。

安裝amqplib

NodeJS如果要和RabbitMQ通信,amqplib是最好的選擇。npm安裝amqplib:

npm i amqplib

封裝RabbitMQ類

對使用RabbitMQ的代碼進行封裝,方便使用。

以下是關於導入amqplib包的介紹:

如果你准備使用Promise的方式處理連接、創建通道,那么你需要這么導入包: var amqp = require('amqplib'); 否則這么導入: var amqp = require('amqplib/callback_api'); 我沒有用Promise,所以使用的第二種方式。因為我在做斷線重連的時候,使用Promise方式沒整成功,所以就沒用Promise了。

咱做技術的肯定要方方面面都要考慮到,才能保證系統的穩定和高可用。很多網上資料都是簡單的介紹了下使用方法,斷線重連都沒介紹。所以基於socket的通信,斷線重連和粘包都是必須要考慮的。扯遠了。。。

下面直接貼代碼吧:

/* eslint-disable */
var amqp = require('amqplib/callback_api')

/**
 * 封裝RabbitMQ的操作類
 */
class RabbitMQ {
    // 構造方法
    constructor(options) {
        if (!options) {
            this.rabbitmqSettings = {
                protocol: 'amqp',
                hostname: '192.168.3.101',
                port: 31802,
                username: 'admin',
                password: 'password'
            };
        }

        this.connection = undefined;
    }
 
    // 接收信息
    receiveQueueMsg(exchangeName, queueName, routingKey, callBack) {
        amqp.connect(this.rabbitmqSettings, (err, conn) => {
            console.log('開始創建連接');
            if (err) {
                console.log('創建連接失敗 ', err)
                setTimeout(() => {
                    this.receiveQueueMsg(exchangeName, queueName, routingKey, callBack);
                }, 3000);
                return;
            }
            console.log('創建連接成功');
            this.connection = conn;
            conn.on('error', (err) => {
                console.log('connect_error ' + err.message, err)
                setTimeout(() => {
                    this.receiveQueueMsg(exchangeName, queueName, routingKey, callBack);
                }, 3000);
            })
            conn.createChannel(function (err, channel) {
                console.log("創建通道", err);
                channel.deleteQueue(queueName);
                // 接收端
                channel.assertQueue(queueName, { durable: true, exclusive: false, autoDelete: false }, false);
                channel.bindQueue(queueName, exchangeName, routingKey);
                // 接收端,ack表示通知RabbitMQ確認收到
                channel.consume(queueName, function (msg) {
                    callBack && callBack(msg);
                    channel.ack(msg);
                }, { noAck: false });
            });
        });
    }

    //關閉連接
    close() {
        this.connection.close();
        console.log("頁面關閉,連接斷開。");
    }
}

export default RabbitMQ

使用RabbitMQ封裝類

1.導入封裝的RabbitMQ類

 import RabbitMQ from '@/utils/rabbitmq' 

2.使用封裝的RabbitMQ類

export default {
  data () {
    return {
      mq: undefined
    }
  },
  methods: {
    close() {
      this.mq.close();
    },
    open() {
      this.mq = new RabbitMQ();
      this.mq.receiveQueueMsg('direct_exchange', 'queue_name_test_3', "message", (msg) =>
      {
        if(msg !== null) {
          console.log(msg.content.toString());
        }
      });
    }
  }
}

頁面上有2個按鈕,一個是打開,一個是關閉,這里就不貼代碼了。

代碼寫的有些瑕疵啊,比如在關閉mq之前,先判斷下mq是否存在,如果存在才能關閉。讀者自己可以補充。

學習資料,務必要看

重要!重要!重要!

重要的事情說三遍!!!https://amqp-node.github.io/amqplib/channel_api.html這個地址非常非常的重要,務必要收藏。里面有nodejs使用rabbitmq的各種API。是百度不到的,還是俺VPN翻到的。

代碼介紹

上面只是貼了代碼,但沒對代碼做解釋,不夠完美,下面補上。

構造方法

RabbitMQ封裝類有個構造方法:constructor,它有個參數:options。也就是說創建這個類的實例的時候,需要傳入options。它是一個對象,如果沒傳,那么在構造方法里面重新定義這個對象。主要包含:通信協議、IP地址、端口、rabbitmq的用戶名和密碼。我准備后期從配置信息讀取這些值。做成可配置的。

構造方法里面還有個connection字段,它是用來存放與rabbitmq連接對象的。主動關閉與rabbitmq的連接時會用到。

接收消息方法

先說需求:每個消費者都要完整消費數據。什么意思呢?比如生產者產生10條數據,每個消費者都要消費這10條。而不是將數據平均到每個消費者消費。

方法receiveQueueMsg(exchangeName, queueName, routingKey, callBack),有四個參數:

1.exchangeName:交換機名稱,rabbitmq使用交換機“分發”或者“路由”生產者產生的數據到不同的通道。生產者和消費者的exchangeName保持一致,且生產時聲明的exchange類型必須為:direct。

2.queueName:通道名稱,每個消費者的queueName可以相同,也可以不相同。但是我的需求就要求每個queueName必須不同。否則無法完整消費數據。

3.routingKey:第1點有提到交換機“分發”生產者產生的數據到不同的通道,那么這一點rabbitmq是怎么做到的呢?就是通過routingKey綁定exchangeName和queueName的關系來實現的。綁定之后一個routingKey對應一個exchangeName和多個queueName。我們在生產和消費的時候指定routingKey就可以了。

4.callBack:這個好理解,收到rabbitmq消息后的回調方法。

amqp.connect()

用來連接rabbitmq。它的第一個參數既可以是一個url,也可以是一個對象。比如:

amqp.connect('amqp://admin:password@192.168.3.101:31802', function (err, conn) {});

或者

amqp.connect(raabitmqSettings, function (err, conn) {});

conn.createChannel()

rabbitmq連接成功后,用來創建消息通道。

channel.deleteQueue(queueName)

創建通道之后,根據消息隊列名稱刪除之前的隊列,需求就這樣。

假如客戶端一開始是關着的,消息隊列中產生了很多數據,如果不刪除隊列,當我打開客戶端時我會獲取很多垃圾數據。而最新的一條數據才是我需要的。

channel.assertQueue()

用來聲明或者說是定義消息隊列。

channel.assertQueue(queueName, { durable: true, exclusive: false, autoDelete: false }, false);

第一個參數queueName:表示消息隊列名稱。

第二個參數是個對象:durable表示隊列是否持久化,注意不是消息是隊列;exclusive表示是否排他,如果為true別的連接(connection)看不到它;autoDelete如果為true,當消費者數量降至零時,隊列將被刪除(默認為false)。

channel.bindQueue()

用來綁定交換機、消息隊列、路由,前面也有提到

channel.bindQueue(queueName, exchange, routingKey);

第一個參數queueName:表示消息隊列名稱

第二個參數exchange:表示交換機名稱

第三個參數routingKey:表示路由名稱

關閉連接方法

close()

關閉連接方法處也有瑕疵,需要先判斷連接對象是否存在,如果存在才允許關閉。后面讀者可以自己修改完善。

斷線重連

主要是在“首次連接失敗”和“連接斷開”時,3秒后重新發起連接。

其它

類文件頂部有/* eslint-disable */這么一行字符,目的是告訴編譯器忽略eslint檢查。

生產端我用的是.net core操作rabbitmq的,demo可以移步到碼雲:https://gitee.com/subendong/RabbitMQ.Test。這個demo里的producer的配置有幾點需要注意:

1. 無需創建隊列
2. 無需綁定交換機、路由、隊列,由消費者綁定
3. 只需定義交換機,指定交換機類型為:direct


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM