RabbitMQ 學習筆記


環境:

MacOS 10.14

Node.js 8.9.1

零、背景


目前有個上線應用會接受多個請求,且每個請求的處理時間可能很久,可能到數小時,所以就想采用異步機制,至於復雜的運算就用消息隊列(MQ)去慢慢消化。

網上調研了一圈,遂采用RabbitMQ。

一、安裝


1、安裝

(1) MacOS
brew install rabbitmq
(2) CentOS (Linux)

https://tecadmin.net/install-rabbitmq-on-centos/

2、配置環境變量

export PATH=$PATH:/usr/local/opt/rabbitmq/sbin

3、使用

(1) 服務器端

rabbitmq-server

啟動需要(默認)200 MB的磁盤空間,但可以通過配置文件里的 disk_free_limit 修改。

(2) 客戶端

以 Node.js 為例:

npm i amqplib

https://www.npmjs.com/package/amqplib

var amqp = require('amqplib/callback_api');

4、用 rabbitmq management 進行后台管理

(1) 開啟服務

rabbitmq-plugins enable rabbitmq_management

此時/etc/rabbitmq下會多出enabled_plugins文件,內容為:

[rabbitmq_management].

此時 rabbitmq management 的地址為http://localhost:15672,默認用戶密碼為 guest/guest

但此時外網訪問,登錄時會提示 User can only log in via localhost

這是由於 rabbitmq 從 3.3.0 開始禁止使用 guest/guest 權限通過除 localhost 外的訪問。

(2) 開啟外網訪問

1、rabbitmq 初始並沒有創建配置文件,需要自行拷貝。

cp /usr/share/doc/rabbitmq-server-3.7.9/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

2、修改此配置文件rabbitmq.config

vim /etc/rabbitmq/rabbitmq.config

把 loopback_users 的注釋解開:

%%{loopback_users, []} -> {loopback_users, []}

這里請小心 {loopback_users, []} 后的逗號可能需要去掉,不然格式會報錯。

3、重啟服務

sudo systemctl restart rabbitmq-server

注:可能在訪問的時候會報這樣的錯:


解決辦法: 關閉全局 ss 代理

二、使用


1、connection —— 連接

amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
        throw error0;
    } 
    // …… 
    // connection.close(); 
});

2、channel —— 通道

通道分為:

生產者(發送者)

消費者(接收者)

connection.createChannel(function(error1, channel) {
		if (error1) {
    		throw error1;
    }
 		// channel …… 
});
(1) queue —— 隊列

隊列里面塞入的是消息

生產者

var queue = 'queue_name';


# 創建or連上隊列
channel.assertQueue(queue, {
		durable: true				# 隊列持久化
}); 
# 臨時隊列(當前 connection 斷掉后就會被刪除)—— 隊列名隨機
channel.assertQueue('',{ exclusive:true });


# 將消息塞入隊列
channel.sendToQueue(queue, Buffer.from(msg), {
    persistent: true		# 消息持久化
});
關於持久化:

一個是防止服務器端的隊列丟失,一個是防止服務器端的隊列里的消息丟失。

但是這並不能避免:如果服務器端在RabbitMQ接受消息的過程中掛了導致的消息丟失。如果需要更強的保證,可以使用 發布者確認

消費者

var queue = 'queue_name';

# 從隊列取出消息
channel.consume(queue, function(msg) {
		channel.ack(msg);	# 發送確認信號
}, {
    noAck: false	
});
關於 ACK ( Acknowledgement )

noAck: true 則服務器端不會期望收到 ACK,也就是說,消息在被發送后會立即出列。

noAck: false 則需要消費者發送 ACK,即channel.ack(msg); ,但如果超時未回復 ACK,消息會重新排隊(但如果同時有其他可用消費者,則會迅速安排過去)

查看當前有多少隊列及各中有多少消息: sudo rabbitmqctl list_queues

(2) prefetch —— 預取
channel.prefetch(1);
# 表示這個通道如果有{1}個未完成的消息,則不會接受新的消息
(3) exchange —— 交換

如果有多個隊列,生產者的消息應該如何分配呢?這個時候就需要一個中間件——交換

其中交換類型有四種:“”(默認交換), topic, headers, fanout

A、 “”(默認交換)

RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列。所以不建議使用channel.sendToQueue(),此為 “”(默認交換)。

如果沒有隊列綁定到交換,消息將會丟失。

B、fanout(廣播)

生產者

var exchange = 'logs';

# 創建or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false	# 持久化
});	

### 推消息給交換
channel.publish(exchange, '', Buffer.from(msg));

消費者

var exchange = 'logs';

# 創建or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false	# 持久化
});

# ------------------------

# 綁定 交換+隊列
channel.bindQueue('queue_1', exchange, ''); 
channel.bindQueue('queue_2', exchange, ''); 
channel.bindQueue('queue_3', exchange, ''); 

queue_1、queue_2、queue_3 都會收到相同的一條消息。

C、direct (直接)

生產者

var exchange = 'logs';

# 創建or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false	# 持久化
});	

### 推消息給交換
channel.publish(exchange, 'black', Buffer.from(msg));

消費者

var exchange = 'logs';

# 創建or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false	# 持久化
});

# ------------------------
 
# 綁定 交換+隊列
channel.bindQueue('queue_1', exchange, 'white');
channel.bindQueue('queue_2', exchange, 'black');
channel.bindQueue('queue_3', exchange, 'red');

只有 queue_2 才會收到消息。

D、topic

生產者

var exchange = 'logs';

# 創建or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false	# 持久化
});	

### 推消息給交換
channel.publish(exchange, 'kern.critical', Buffer.from(msg));

消費者

var exchange = 'logs';

# 創建or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false	# 持久化
});

# ------------------------
 
# 綁定 交換+隊列
channel.bindQueue('queue_1', exchange, '#');
channel.bindQueue('queue_2', exchange, "kern.*");
channel.bindQueue('queue_3', exchange, "*.critical");
  • *(星號)可以替代一個單詞。
  • #(hash)可以替換零個或多個單詞。

查看所有的 交換 及 交換綁定隊列
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings
代碼職責風格:

生產者只管發送消息就好 (比如發送消息給隊列或者交換)

消費者要負責接受消息以外的更多事 (比如負責隊列的 prefetch 設置,或者交換的綁定)

3、遠程過程調用(RPC)

三、應用

例如可以用到日志系統中:對所有等級的日志都打印到控制台(即下面的隊列),而 error 日志單獨持久化到 disk(即上面的隊列)。

四、MQ 的優缺點


優點:解耦、異步、削峰
缺點:系統可用性降低,系統復雜性增加


參考資料

1、官方RabbitMQ教程

https://www.rabbitmq.com/getstarted.html

2、amqp.node 參考API

https://www.squaremobius.net/amqp.node/channel_api.html#channel_ack


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM