rabbitmq Exchange四種模式


一、什么是Exchange

RabbitMQ 是 AMQP(高級消息隊列協議)的標准實現:

 

 

 

從 AMQP 協議可以看出,Queue、Exchange 和 Binding 構成了 AMQP 協議的核心

  • Producer:消息生產者,即投遞消息的程序。

  • Broker:消息隊列服務器實體。

    • Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。

    • Binding:綁定,它的作用就是把 Exchange 和 Queue 按照路由規則綁定起來。

    • Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。

  • Consumer:消息消費者,即接受消息的程序。

二、Exchange的類型

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種

fanout 

fanout類型的Exchange路由規則非常簡單,它會把所有發送到fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的所有Queue上。

Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每台子網內的主機都獲得了一份復制的消息。所以,Fanout Exchange 轉發消息是最快的。

direct

direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。

direct Exchange是RabbitMQ Broker的默認Exchange,它有一個特別的屬性對一些簡單的應用來說是非常有用的,在使用這個類型的Exchange時,可以不必指定routing key的名字,在此類型下創建的Queue有一個默認的routing key,這個routing key一般同Queue同名。

 
 

direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。

topic

前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,它約定:

a)、routing key為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
b)、binding key與routing key一樣也是句點號“. ”分隔的字符串
c)、binding key中可以存在兩種特殊字符"*" 與“#”,用於做模糊匹配,其中" * "用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

 

 

所有發送到Topic Exchange的消息被轉發到所有關心RouteKey中指定Topic的Queue上,Exchange 將RouteKey 和某Topic 進行模糊匹配。此時隊列需要綁定一個Topic。可以使用通配符進行模糊匹配,符號“#”匹配一個或多個詞,符號"*" 匹配不多不少一個詞。因此“log.#”能夠匹配到“log.info.oa”,但是“log.*” 只會匹配到“log.error”

headers

headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。

 

示例代碼
生產者
const amqp = require('amqplib');

async function producer() {
  try {
    // 1. 創建鏈接對象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 獲取通道
    const channel = await connection.createChannel();

    // 3. 聲明參數
    const exchangeName = 'qosEx';
    const routingKey = 'qos.test001.cool';
    const msg = 'Producer';

    // 4. 聲明交換機
    await channel.assertExchange(exchangeName, 'topic', { durable: true });

    for (let i = 0; i < 5; i++) {
      // 5. 發送消息
      console.log(i);
      await channel.publish(
        exchangeName,
        routingKey,
        Buffer.from(`${msg} 第${i}條消息`)
      );
    }

    await channel.close();
  } catch (error) {
    console.log(error);
  }
}

producer();

消費者

const amqp = require('amqplib');

async function consumer() {
  // 1. 創建鏈接對象
  const connection = await amqp.connect('amqp://122.51.9.11:5672');

  // 2. 獲取通道
  const channel = await connection.createChannel();

  // 3. 聲明參數
  const exchangeName = 'qosEx';
  const queueName = 'qosQueue';
  const bindingKey = 'qos.#';
  // const bindingKey = 'qos.*';  無法匹配

  // 4. 聲明交換機、對列進行綁定
  await channel.assertExchange(exchangeName, 'topic', { durable: true });
  await channel.assertQueue(queueName);
  await channel.bindQueue(queueName, exchangeName, bindingKey);

  // 5. 限流參數設置
  await channel.prefetch(1, false);// count:每次推送給消費端 N 條消息數目,如果這 N 條消息沒有被ack,生產端將不會再次推送直到這 N 條消息被消費。global:在哪個級別上做限制,ture 為 channel 上做限制,false 為消費端上做限制,默認為 false。
  // 6. 限流,noAck參數必須設置為false
  await channel.consume(
    queueName,
    (msg) => {
      console.log('Consumer:', msg.content.toString());
      channel.ack(msg);
    },
    { noAck: false }
  );
}

consumer();

 

參考鏈接:https://www.jianshu.com/p/19af0f40bbde




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM