架構設計之NodeJS操作消息隊列RabbitMQ


一. 什么是消息隊列?

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

二. 常用的消息隊列有哪些?

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

甚至現在部分NoSQL也可做消息隊列,如Redis。

三. 消息隊列的使用場景?

  • 異步處理

  • 應用解耦

  • 流量削峰

四. 使用案例

上規模的公司都會有自己的日志分析系統,日志系統是怎么實現的呢?

 

圖解:用戶在訪問應用的時候,我們要記錄下用戶的操作記錄和系統的異常日志,常規的做法是將系統產生的日志保存到服務器磁盤,在服務器中開啟定時任務,定時將磁盤的日志信息傳入mq中(生產者),也定時將mq中的消息取出並存到相應的數據庫,如ElasticSearch或Hive中。

五. 如何安裝RabbitMQ?

上面的案例介紹了MQ的一個使用場景,我這里是用RabbitMQ舉例,現實項目中可能用到的是Kafka。

  1. 首先安裝brew(mac為例)

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 
  2. 安裝RabbitMQ

    brew install rabbitmq
  3. 運行RabbitMQ

    進入到 /usr/local/Cellar/rabbitmq/3.7.7,執行

    sbin/rabbitmq-server
  4. 啟動插件

    進入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management
  5. 登陸管理界面

    打開瀏覽器輸入:http://localhost:15672,RabbitMQ默認15672端口六. Nodejs操作RabbitMQ

     

 

網上可以找到好幾個相應的Node SDK,這里推薦amqplib

1. 生產者

/**
 * 對RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

 

2. 消費者

/**
 * 對RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }

3. 通過生產者向MQ發送一個消息,並創建隊列

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
    console.log(error)
})

執行之后,我們打開管理平台,發現RabbbitMQ已經接受到了一條消息:

並且RabbbitMQ新增了一個隊列testQueue

4. 獲取指定隊列的消息

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => 
{    
   console.log(msg)
})
// 輸出結果:my first message復制代碼

此時打開RabbitMQ管理平台,消息數量已經變為0

綜上:我們簡單講述了消息隊列及RabbitMQ相關的一些知識,以及我們如何通過nodejs來生產與消費消息,上面講的比較簡單,之后會發表更多文章講述消息隊列集群搭建及容災的實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM