RabbitMQ詳解三-RabbitMQ的五種隊列


1、簡單隊列

其實上篇文章末尾給出的代碼就是簡單隊列。

  

  一個生產者對應一個消費者!!!

生產者將消息發送到“hello”隊列。消費者從該隊列接收消息。

  ①、pom文件

  必須導入rabbitmq 依賴包

 <dependency>
    <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>3.4.1</version>
   </dependency>

②、工具類

package com.ys.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Create by hadoop
 */
public class ConnectionUtil {

    public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{
        //1、定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2、設置服務器地址
        factory.setHost(host);
        //3、設置端口
        factory.setPort(port);
        //4、設置虛擬主機、用戶名、密碼
        factory.setVirtualHost(vHost);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        //5、通過連接工廠獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}
View Code

、生產者 Producer

package com.ys.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ys.utils.ConnectionUtil;

/**
 * Create by YSOcean
 */
public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明信道
        Channel channel = connection.createChannel();
        //3、聲明(創建)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、定義消息內容
        String message = "hello rabbitmq ";
        //5、發布消息
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("[x] Sent'"+message+"'");
        //6、關閉通道
        channel.close();
        //7、關閉連接
        connection.close();
    }
}
View Code

、消費者Consumer

package com.ys.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //5、監聽隊列
        /*
            true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息已經成功消費
            false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,
                   如果消費者一直沒有反饋,那么該消息將一直處於不可用狀態,並且服務器會認為該消費者已經掛掉,不會再給其
                   發送消息,直到該消費者反饋。
         */

        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }

}
View Code

注意這里消費者有自動確認消息和手動確認消息兩種模式。

2、work 模式

 

  一個生產者對應多個消費者,但是只能有一個消費者獲得消息!!!

  競爭消費者模式。

  ①、生產者

package com.ys.workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ys.utils.ConnectionUtil;

/**
 * Create by YSOcean
 */
public class Producer {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明信道
        Channel channel = connection.createChannel();
        //3、聲明(創建)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、定義消息內容(發布多條消息)
        for(int i = 0 ; i < 10 ; i++){
            String message = "hello rabbitmq "+i;
            //5、發布消息
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("[x] Sent'"+message+"'");
            //模擬發送消息延時,便於演示多個消費者競爭接受消息
            Thread.sleep(i*10);
        }
        //6、關閉通道
        channel.close();
        //7、關閉連接
        connection.close();
    }
}
View Code

②、消費者

  這里創建兩個消費者

  消費者1:每接收一條消息后休眠10毫秒

package com.ys.workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer1 {

    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //同一時刻服務器只會發送一條消息給消費者
        //channel.basicQos(1);

        //4、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //5、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //消費者1接收一條消息后休眠10毫秒
            Thread.sleep(10);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

}
View Code

       消費者2:每接收一條消息后休眠1000毫秒

package com.ys.workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer2 {

    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //同一時刻服務器只會發送一條消息給消費者
        //channel.basicQos(1);

        //4、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //5、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //消費者2接收一條消息后休眠1000毫秒
            Thread.sleep(1000);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

}
View Code

③、測試結果

  首先生產者一次打印從0-9條消息

 

  接着我們看消費者1:結果為打印偶數條消息

  

  消費者2:結果為打印奇數條消息

  

  ④、分析結果

  消費者1和消費者2獲取到的消息內容是不同的,也就是說同一個消息只能被一個消費者獲取。

  消費者1和消費者2分別獲取奇數條消息和偶數條消息,兩種獲取消息的條數是一樣的。

  前面我們說這種模式是競爭消費者模式,一條隊列被多個消費者監聽,這里兩個消費者,其中消費者1和消費者2在獲取消息后分別休眠了10毫秒和1000毫秒,也就是說兩個消費者獲取消息的效率是不一樣的,但是結果卻是兩者獲得的消息條數是一樣的,這根本就不構成競爭關系,那么我們應該怎么辦才能讓工作效率高的消費者獲取消息更多,也就是消費者1獲取消息更多呢?

  PS:在增加一個消費者其實獲取消息條數也是一樣的,消費者1獲取0,3,6,9,消費者2獲取1,4,7,消費者3獲取2,5,8

  ⑤、能者多勞

channel.basicQos(1);

增加如上代碼,表示同一時刻服務器只會發送一條消息給消費者。消費者1和消費者2獲取消息結果如下:

  

  

  ⑥、應用場景

  效率高的消費者消費消息多。可以用來進行負載均衡。

 

3、發布/訂閱模式

  

  一個消費者將消息首先發送到交換器,交換器綁定到多個隊列,然后被監聽該隊列的消費者所接收並消費。

  ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這里的交換器是 fanout。下面我們會詳細介紹這幾種交換器。

  ①、生產者

 

package com.ys.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ys.utils.ConnectionUtil;

/**
 * Create by YSOcean
 */
public class Producer {
    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest");
        //2、聲明信道
        Channel channel = connection.createChannel();
        //3、聲明交換器
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //4、創建消息
        String message = "hello rabbitmq";
        //5、發布消息
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println("[x] Sent'" + message + "'");
        //6、關閉通道
        channel.close();
        //7、關閉連接
        connection.close();
    }
}
View Code

②、消費者

  消費者1:

package com.ys.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer1 {

    private final static String QUEUE_NAME = "fanout_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、綁定隊列到交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);
        //5、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 消費者1:" + message + "'");
            //消費者1接收一條消息后休眠10毫秒
            Thread.sleep(10);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

}
View Code

       消費者2:

package com.ys.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer2 {

    private final static String QUEUE_NAME = "fanout_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、綁定隊列到交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);
        //5、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 消費者2:" + message + "'");
            //消費者2接收一條消息后休眠10毫秒
            Thread.sleep(1000);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }
}
View Code

注意:消費者1和消費者2兩者監聽的隊列名稱是不一樣的,我們可以通過前台管理系統看到:

  

  ③、測試結果

  

  

  

  消費1和消費者2都消費了該消息。

  ps:這是因為消費者1和消費者2都監聽了被同一個交換器綁定的隊列。如果消息發送到沒有隊列綁定的交換器時,消息將丟失,因為交換器沒有存儲消息的能力,消息只能存儲在隊列中。

  ④、應用場景

  比如一個商城系統需要在管理員上傳商品新的圖片時,前台系統必須更新圖片,日志系統必須記錄相應的日志,那么就可以將兩個隊列綁定到圖片上傳交換器上,一個用於前台系統更新圖片,另一個用於日志系統記錄日志。

 

4、路由模式

  生產者將消息發送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發送的消息會指定一個路由key,那么消息只會發送到相應key相同的隊列,接着監聽該隊列的消費者消費消息。

  也就是讓消費者有選擇性的接收消息。

  ①、生產者

package com.ys.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ys.utils.ConnectionUtil;

/**
 * Create by YSOcean
 */
public class Producer {
    private final static String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest");
        //2、聲明信道
        Channel channel = connection.createChannel();
        //3、聲明交換器,類型為direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //4、創建消息
        String message = "hello rabbitmq";
        //5、發布消息
        channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());
        System.out.println("生產者發送" + message + "'");
        //6、關閉通道
        channel.close();
        //7、關閉連接
        connection.close();
    }
}
View Code

②、消費者

  消費者1:

package com.ys.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer1 {

    private final static String QUEUE_NAME = "direct_queue_1";

    private final static String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、綁定隊列到交換機,指定路由key為update
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"add");
        //同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);
        //5、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 消費者1:" + message + "'");
            //消費者1接收一條消息后休眠10毫秒
            Thread.sleep(10);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

}
View Code

       消費者2:

package com.ys.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer2 {

    private final static String QUEUE_NAME = "direct_queue_2";

    private final static String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、綁定隊列到交換機,指定路由key為select
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");
        //同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);
        //5、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 消費者1:" + message + "'");
            //消費者2接收一條消息后休眠10毫秒
            Thread.sleep(1000);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

}
View Code

③、測試結果

  我們首先看代碼,生產者發布消息,指定的路由key為update。消費者1綁定隊列和交換機時key分別是update/delete/add;消費者2綁定隊列和交換器時key是select。

  所以我們可以猜測生產者發送的消息,只有消費者1能夠接收並消費,而消費者2是不能接收的。

  

  

  

  ④、應用場景

  利用消費者能夠有選擇性的接收消息的特性,比如我們商城系統的后台管理系統對於商品進行修改、刪除、新增操作都需要更新前台系統的界面展示,而查詢操作確不需要,那么這兩個隊列分開接收消息就比較好。

 

5、主題模式

  上面的路由模式是根據路由key進行完整的匹配(完全相等才發送消息),這里的通配符模式通俗的來講就是模糊匹配。

  符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。

  ①、生產者

package com.ys.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ys.utils.ConnectionUtil;

/**
 * Create by YSOcean
 */
public class Producer {
    private final static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest");
        //2、聲明信道
        Channel channel = connection.createChannel();
        //3、聲明交換器,類型為direct
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //4、創建消息
        String message = "hello rabbitmq111";
        //5、發布消息
        channel.basicPublish(EXCHANGE_NAME, "update.Name", null, message.getBytes());
        System.out.println("生產者發送" + message + "'");
        //6、關閉通道
        channel.close();
        //7、關閉連接
        connection.close();
    }
}
View Code

②、消費者

  消費者1:

package com.ys.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer1 {

    private final static String QUEUE_NAME = "topic_queue_1";

    private final static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、綁定隊列到交換機,指定路由key為update.#
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#");
        //同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);
        //5、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 消費者1:" + message + "'");
            //消費者1接收一條消息后休眠10毫秒
            Thread.sleep(10);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

}
View Code

       消費2:

package com.ys.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
 * Create by YSOcean
 */
public class Consumer2 {

    private final static String QUEUE_NAME = "topic_queue_2";

    private final static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception{
        //1、獲取連接
        Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
        //2、聲明通道
        Channel channel = connection.createChannel();
        //3、聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、綁定隊列到交換機,指定路由key為select.#
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select.#");
        //同一時刻服務器只會發送一條消息給消費者
        channel.basicQos(1);
        //5、定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6、監聽隊列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
        //6、獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 消費者1:" + message + "'");
            //消費者2接收一條消息后休眠10毫秒
            Thread.sleep(1000);
            //返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }

}
View Code

③、分析結果

  生產者發送消息綁定的路由key為update.Name;消費者1監聽的隊列和交換器綁定路由key為update.#;消費者2監聽的隊列和交換器綁定路由key為select.#。

  很顯然,消費者1會接收到消息,而消費者2接收不到。

6、四種交換器

前面五種隊列模式介紹完了,但是實際上只有三種,第一種簡單隊列,第二種工作模式,剩下的三種都是和交換器綁定的合起來稱為一種,這小節我們就來詳細介紹交換器。

  交換器分為四種,分別是:direct、fanout、topic和 headers。

  前面三種分別對應路由模式、發布訂閱模式和通配符模式,headers 交換器允許匹配 AMQP 消息的 header 而非路由鍵,除此之外,header 交換器和 direct 交換器完全一致,但是性能卻差很多,因此基本上不會用到該交換器,這里也不詳細介紹。

  ①、direct

  如果路由鍵完全匹配的話,消息才會被投放到相應的隊列。

   

  ②、fanout

  當發送一條消息到fanout交換器上時,它會把消息投放到所有附加在此交換器上的隊列。

   

  ③、topic

  設置模糊的綁定方式,“*”操作符將“.”視為分隔符,匹配單個字符;“#”操作符沒有分塊的概念,它將任意“.”均視為關鍵字的匹配部分,能夠匹配多個字符。

  

 

 

7、總結

關於 RabbitMQ 的五種隊列,其實實際使用最多的是最后一種主題模式,通過模糊匹配,使得操作更加自如。那么我們總結一下有交換器參與的隊列(最后三種隊列)工作方式如下:

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM