rabbitMQ 中三種常用交換機:direct、topic、fanout


在rabbitmq中有許多交換機,不同的交換機適用於不同的場景。如下:

這么多交換機中,最常用的交換機有三種:direct、topic、fanout。我分別叫他們:“直接連接交換機”,“主題路由匹配交換機”,“無路由交換機”。以下是詳細的介紹:

Direct 交換機

這個交換機就是一個直接連接交換機,什么叫做直接連接交換機呢?

所謂“直接連接交換機”就是:Producer(生產者)投遞的消息被DirectExchange (交換機)轉發到通過routingkey綁定到具體的某個Queue(隊列),把消息放入隊列,然后Consumer從Queue中訂閱消息。

以下是代碼實例:

消費者:
package com.wy.testrabbitmq.testdirect;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:56
*/
public class Consumer {
   public static void main(String[] args) throws Exception {
       //創建連接工廠
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("127.0.0.1");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       //創建連接
       Connection connection = connectionFactory.newConnection();
       // 創建通道
       Channel channel = connection.createChannel();
       //交換機名
       String exchangeName = "testDirectExchange";
       //隊列
       String queueName = "test002";
       //routingkey
       String routingkey = "testDirect";
       //交換機類型
       String exchangeType = "direct";
       /**
        * 聲明一個交換機
        */
       channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
       /**
        * 聲明一個隊列
        * 第一個參數表示這個信道連接哪個隊列
        * 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的
        * 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列
        * 第四個參數表示隊列脫離綁定時是否自動刪除
        * 第五個參數表示擴展參數,可設置為null
        */
       channel.queueDeclare(queueName, true, false, false, null);

       //建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey)
       channel.queueBind(queueName, exchangeName, routingkey);

       //創建消費者,指定建立在那個連接上
       QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
       //設置channel
       // 第二個參數 是否自動簽收
       // 第三個參數表示消費對象
       channel.basicConsume(queueName, true, queueingConsumer);
       //獲取消息
       //int i=0;
       while (true) {
           // 沒有消息就阻塞
           QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
           String message = new String(delivery.getBody());
           System.out.println("消費端接收消息:" + message);
       }
   }
}

生產者:
package com.wy.testrabbitmq.testdirect;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**Direct交換機
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:16
*/
public class Producer {
   public static void main(String[] args) throws Exception {
       //創建連接工廠
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("127.0.0.1");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       //創建連接
       Connection connection = connectionFactory.newConnection();
       // 創建通道
       Channel channel=connection.createChannel();
      //聲明交換機
       String exchangeName="testDirectExchange";
        // routingkey
       String routingkey="testDirect";
       // 發送消息
       String message="這是一條測試direct的數據";
       channel.basicPublish(exchangeName,routingkey,null,message.getBytes());
       //注意:關閉連接
       channel.close();
       connection.close();
   }
}

  

在管控台Exchange中可以看到多了一個交換機:


在這里插入圖片描述
點擊testDirectExchange中Bindings可以看到我們的Routingkey:testDirect和綁定的隊列test002


在這里插入圖片描述

點擊test002可以快速進入到隊列中,點擊binding可以查看到隊列綁定的交換機。

在這里插入圖片描述

想一下,是不是:生產者發送消息到DirectExchange交換機,交換機根據routingkey轉發消息到綁定的Queue,供消費者消費。

 

 

Topic 交換機

舉個現實生活中的栗子:

假如你想在淘寶上買一雙運動鞋,那么你是不是會在搜索框中搜“XXX運動鞋”,這個時候系統將會模糊匹配的所有符合要求的運動鞋,然后展示給你。

所謂“主題路由匹配交換機”也是這樣一個道理,但是使用時也有一定的規則。

比如:

String routingkey = “testTopic.#”;
String routingkey = “testTopic.*”;

  • *表示只匹配一個詞
  • #表示匹配多個詞

具體看示例代碼和演示效果

消費者:
package com.wy.testrabbitmq.testtopic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:56
*/
public class Consumer {
   public static void main(String[] args) throws Exception {
       //創建連接工廠
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("127.0.0.1");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       //創建連接
       Connection connection = connectionFactory.newConnection();
       // 創建通道
       Channel channel = connection.createChannel();
       //交換機名
       String exchangeName = "testTopicExchange";
       //隊列
       String queueName = "test002";
       //routingkey
       //String routingkey = "testTopic.#";
       String routingkey = "testTopic.#";

       //交換機類型
       String exchangeType = "topic";
       /**
        * 聲明一個交換機
        */
       channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
       /**
        * 聲明一個隊列
        * 第一個參數表示這個信道連接哪個隊列
        * 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的
        * 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列
        * 第四個參數表示隊列脫離綁定時是否自動刪除
        * 第五個參數表示擴展參數,可設置為null
        */
       channel.queueDeclare(queueName, true, false, false, null);

       //建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey)
       channel.queueBind(queueName, exchangeName, routingkey);
       //創建消費者,指定建立在那個連接上
       QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
       //設置channel
       // 第二個參數 是否自動簽收
       // 第三個參數表示消費對象
       channel.basicConsume(queueName, true, queueingConsumer);
       //獲取消息
       //int i=0;
       while (true) {
           // 沒有消息就阻塞
           QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
           String message = new String(delivery.getBody());
           System.out.println("消費端接收消息:" + message);
       }
   }
}

生產者:
package com.wy.testrabbitmq.testtopic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**交換機
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:16
*/
public class Producer {
   public static void main(String[] args) throws Exception {
       //創建連接工廠
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("127.0.0.1");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       //創建連接
       Connection connection = connectionFactory.newConnection();
       // 創建通道
       Channel channel=connection.createChannel();
      //聲明交換機
       String exchangeName="testTopicExchange";
        // routingkey
       String routingkey="testTopic.qqq";
       String routingkey1 = "testTopic.test1";
       String routingkey2 = "testTopic.test2.test";
       String routingkey3 = "testTopic.test.test";
       // 發送消息
       String message="這是一條測試direct的數據";
       channel.basicPublish(exchangeName,routingkey,null,message.getBytes());
       channel.basicPublish(exchangeName,routingkey1,null,message.getBytes());
       channel.basicPublish(exchangeName,routingkey2,null,message.getBytes());
       channel.basicPublish(exchangeName,routingkey3,null,message.getBytes());
       //注意:關閉連接
       channel.close();
       connection.close();
   }
}

  

#運行效果:可以看到以testTopic.開頭的所有routingkey都匹配成功了,有四條數據。
在這里插入圖片描述
把代碼中#改成*運行效果:可以看到以testTopic.開頭的routingkey只匹配了一個詞,有兩條數據。
在這里插入圖片描述

總結:

#號與 *號就好像我們sql里面的%與_ ,表示匹配多個和只能匹配一個。

注意:

如果你路由匹配了#又不想匹配#,換成了匹配*,請記得去解綁。
如下:查看管控台你會發現他綁定了兩個。

在這里插入圖片描述
如果你換綁了,請記得解綁,不然出來的數據是既符合#,又符合*,就像並集一樣。

Fanout 交換機

“無路由交換機”,說白了就是,使用這個交換機不需要routingkey綁定,和路由沒有關系,它是直接綁定到隊列的。

消費者:
package com.wy.testrabbitmq.testfanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @author wangyan@163.com
 * @version 1.0
 * @date 2019-06-05 10:56
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //創建連接
        Connection connection = connectionFactory.newConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        //交換機名
        String exchangeName = "testFanoutExchange";
        //隊列
        String queueName = "test002";
        //routingkey
        String routingkey = "";

        //交換機類型
        String exchangeType = "fanout";
        /**
         * 聲明一個交換機
         */
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        /**
         * 聲明一個隊列
         * 第一個參數表示這個信道連接哪個隊列
         * 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的
         * 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列
         * 第四個參數表示隊列脫離綁定時是否自動刪除
         * 第五個參數表示擴展參數,可設置為null
         */
        channel.queueDeclare(queueName, true, false, false, null);

        //建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey)
        channel.queueBind(queueName, exchangeName, routingkey);
        //創建消費者,指定建立在那個連接上
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //設置channel
        // 第二個參數 是否自動簽收
        // 第三個參數表示消費對象
        channel.basicConsume(queueName, true, queueingConsumer);
        //獲取消息
        //int i=0;
        while (true) {
            // 沒有消息就阻塞
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("消費端接收消息:" + message);
        }
    }
}

生產者:
package com.wy.testrabbitmq.testfanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**交換機
 * @author wangyan@163.com
 * @version 1.0
 * @date 2019-06-05 10:16
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //創建連接
        Connection connection = connectionFactory.newConnection();
        // 創建通道
        Channel channel=connection.createChannel();
       //聲明交換機
        String exchangeName="testFanoutExchange";
        // 發送消息
        String message="這是一條測試direct的數據";
        //wewe是隨便寫的routingkey,這里為了驗證fanout交換機和路由沒關系
        for (int i = 1; i <= 7; i++) {
            channel.basicPublish(exchangeName,"wewe",null,message.getBytes());
        }
        //注意:關閉連接
        channel.close();
        connection.close();
    }
}

  

示例代碼中,我在生產端寫了一個不存在的routingkey,如下:

//wewe是隨便寫的routingkey,這里為了驗證fanout交換機和路由沒關系
for (int i = 1; i <= 7; i++) {
channel.basicPublish(exchangeName,“wewe”,null,message.getBytes());
}

消費端並未指定routingkey,如下:

String routingkey = “”;

我們只在消費端綁定了Queue,運行結果證明在fanout交換機下,不使用路由並不影響我們生產者投遞信息,消費者訂閱信息。

再看一下我們的管控台:生成了交換機
在這里插入圖片描述
我們Bindings中並沒有Routingkey
在這里插入圖片描述
這個也充分說明了fanout交換機是“無路由交換機”。

總結:

fanout交換機不需要routingkey綁定,和路由沒有關系,它是直接綁定到隊列的。
fanout交換機直接綁定了隊列,沒有經過routingkey進行匹配之類的,相對來說是所有交換機中最快的。

原文地址:  地址一   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM