一、什么是Exchange
RabbitMQ 是 AMQP(高級消息隊列協議)的標准實現:
從 AMQP 協議可以看出,Queue、Exchange 和 Binding 構成了 AMQP 協議的核心
-
Producer:消息生產者,即投遞消息的程序。
-
Broker:消息隊列服務器實體。
-
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
-
Binding:綁定,它的作用就是把 Exchange 和 Queue 按照路由規則綁定起來。
-
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
-
-
Consumer:消息消費者,即接受消息的程序。
二、Exchange的類型
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種
fanout
fanout類型的Exchange路由規則非常簡單,它會把所有發送到fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的所有Queue上。
direct
direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。
direct Exchange是RabbitMQ Broker的默認Exchange
,它有一個特別的屬性對一些簡單的應用來說是非常有用的,在使用這個類型的Exchange時,可以不必指定routing key的名字,在此類型下創建的Queue有一個默認的routing key,這個routing key一般同Queue同名。

direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。
topic
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,它約定:
b)、binding key與routing key一樣也是句點號“. ”分隔的字符串
c)、binding key中可以存在兩種特殊字符"*" 與“#”,用於做模糊匹配,其中" * "用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

headers
headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。

示例代碼
const amqp = require('amqplib'); async function producer() { try { // 1. 創建鏈接對象 const connection = await amqp.connect('amqp://localhost:5672'); // 2. 獲取通道 const channel = await connection.createChannel(); // 3. 聲明參數 const exchangeName = 'qosEx'; const routingKey = 'qos.test001.cool'; const msg = 'Producer'; // 4. 聲明交換機 await channel.assertExchange(exchangeName, 'topic', { durable: true }); for (let i = 0; i < 5; i++) { // 5. 發送消息 console.log(i); await channel.publish( exchangeName, routingKey, Buffer.from(`${msg} 第${i}條消息`) ); } await channel.close(); } catch (error) { console.log(error); } } producer();
消費者
const amqp = require('amqplib'); async function consumer() { // 1. 創建鏈接對象 const connection = await amqp.connect('amqp://122.51.9.11:5672'); // 2. 獲取通道 const channel = await connection.createChannel(); // 3. 聲明參數 const exchangeName = 'qosEx'; const queueName = 'qosQueue'; const bindingKey = 'qos.#'; // const bindingKey = 'qos.*'; 無法匹配 // 4. 聲明交換機、對列進行綁定 await channel.assertExchange(exchangeName, 'topic', { durable: true }); await channel.assertQueue(queueName); await channel.bindQueue(queueName, exchangeName, bindingKey); // 5. 限流參數設置 await channel.prefetch(1, false);// count:每次推送給消費端 N 條消息數目,如果這 N 條消息沒有被ack,生產端將不會再次推送直到這 N 條消息被消費。global:在哪個級別上做限制,ture 為 channel 上做限制,false 為消費端上做限制,默認為 false。
// 6. 限流,noAck參數必須設置為false await channel.consume( queueName, (msg) => { console.log('Consumer:', msg.content.toString()); channel.ack(msg); }, { noAck: false } ); } consumer();