RabbitMQ詳解(三)------RabbitMQ的五種模式


RabbitMQ詳解(三)------RabbitMQ的五種模式

1.簡單隊列(模式)

上一篇文章末尾的實例給出的代碼就是簡單模式.

一個生產者對應一個消費者!!!

pom.xml

​ 必須導入RabbitMQ依賴包

<!--RabbitMQ-client-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>

ConnectionUtil.java

package org.alva.Utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句話描述>,RabbitMQ的連接工具類
 * <詳細介紹>,
 *
 */
public class ConnectionUtil {
    public static Connection getConnection(String host, int port, String vhost, String username, String password) throws IOException, TimeoutException {
        //1.定義連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.設置服務器地址
        connectionFactory.setHost(host);
        //3.設置端口
        connectionFactory.setPort(port);
        //4.設置虛擬主機,用戶名,密碼
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        //5.通過連接工廠獲取連接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

Consumer.java

package org.alva.RabbitMQ;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.alva.Utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句話描述>,消費者
 * <詳細介紹>,
 *
 */
public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.獲取連接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.聲明通道
        Channel channel = connection.createChannel();
        //3.聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4.定義隊列的消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //5.監聽隊列
        /*
            true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息成功消費.
            false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,
            如果消費者一直沒有反饋,那么該消息將一直處於不可用狀態,並且服務器會認為該消費者已經掛掉,不會再給其發送消息,
            直到該消費者反饋.
        */
        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
        //6.獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received '" + message + "'");
        }
    }
}

Productor.java

package org.alva.RabbitMQ;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.alva.Utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句話描述>,生產者
 * <詳細介紹>,
 *
 */
public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.獲取連接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.聲明通道
        Channel channel = connection.createChannel();
        //3.聲明(創建)隊列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4.定義消息內容
        String message = "hello rabbitmq";
        //5.發布消息
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("[x] send'"+message+"'");
        //6.關閉通道和連接
        channel.close();
        connection.close();

    }
}


2.work模式

一個生產者對應多個消費者,但是只能有一個消費者獲得消息!!!

競爭消費者模式.

  1. 生產者

    package org.alva.RabbitMQ.WorkModel;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,生產者
     * <詳細介紹>,Work模式下的生產者
     *
     */
    public class Producter {
        public static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明(創建)隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.定義消息內容,發布多條消息
            for (int i = 0; i < 10; i++) {
                String message = "hello rabbitmq " + i;
                //5.發布消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("[x] send message is '" + message + "'");
                //6.模擬發送消息延時,便於展示多個消費者競爭接受消息
                Thread.sleep(i * 10);
            }
            //7.關閉信道
            channel.close();
            //8.關閉連接
            connection.close();
        }
    }
    
    
  2. 消費者

    需要創建兩個消費者.

    消費者1:每接收一條消息后休眠10毫秒.

    package org.alva.RabbitMQ.WorkModel;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,消費者
     * <詳細介紹>,Work模式下的消費者
     *
     */
    public class Consumer1 {
        public static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            //2.聲明通道
            Channel channel = connection.createChannel();
            //3.聲明隊列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //同一時刻服務器只會發送一條消息給消費者
    //        channel.basicQos(1);
    
            //4.定義隊列的消費者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //5.監聽隊列,手動返回完成狀態
            channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
            //6.獲取消息
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] received message : '"+message+"'");
                //休眠10毫秒
                Thread.sleep(10);
                //返回確認狀態
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        }
    }
    
    

    消費者2:每接收一條消息后休眠1000毫秒

    package org.alva.RabbitMQ.WorkModel;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,
     * <詳細介紹>,
     *
     */
    public class Consumer2 {
        public static final String QUEUE_NAME = "work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //        channel.basicQos(1);
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] received message : '" + message + "'");
                Thread.sleep(1000);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        }
    }
    
    
  3. 測試結果

    1. 首先生產者一次打印從0-9條消息

      image-20180819000057095

    2. 然后是消費者1:結果為打印偶數條消息(注:先啟動的消費者為消費者1)

      image-20180819000259211

    3. 消費者2:結果為打印奇數條消息

      image-20180819000335579

    結論:

    ​ *消費者1和消費者2獲取到的消息內容是不同的,也就是說同一個消息只能被一個消費者獲取.

    ​ *消費者1和消費者2分別獲取奇數條消息和偶數條消息,兩種獲取消息的條數是一樣的.

    ​ 前面我們說這種模式是競爭消費者模式,一條隊列被多個消費者監聽,這里兩個消費者,其中消費者1和消費者2在獲取消息后分別休眠了10毫秒和1000毫秒,也就是說兩個消費者獲取消息的效率是不一樣的,但是結果卻是兩者獲得的消息條數是一樣的,這根本不構成競爭關系,那么我們應該怎么辦才能讓工作效率更高的消費者獲取消息更多,也就是消費者1獲取消息更多呢?

    1. 能者多勞

      channel.basicQos(1);
      

      增加如上代碼,表示同一時刻服務器只會發送一條消息給消費者.消費者1和消費者2獲取消息結果如下:

      image-20180819001133009

      image-20180819001145486

    2. 應用場景

      效率高的消費者消費消息多,可以用來進行負載均衡.

3.發布/訂閱模式

一個消費者將消息首先發送到交換器,交換器綁定多個隊列,然后被監聽該隊列的消費者所接收並消費.

*在RabbitMQ中,交換器主要有四種類型:direct,fanout,topic,headers,這里的交換器是fanout.

  1. 生產者

    package org.alva.RabbitMQ.PublishModel;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,發布/訂閱模式下的生產者
     * <詳細介紹>,
     *
     */
    public class Producer {
        private final static String EXCHANGE_NAME = "fanout_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明交換器
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //4.定義消息內容
            String message = "hello rabbitmq";
            //5.發布消息
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println("[x] send'" + message + "'");
            //6.關閉通道和連接
            channel.close();
            connection.close();
    
        }
    }
    
    
  2. 消費者

    消費者1:

    package org.alva.RabbitMQ.PublishModel;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,消費者
     * <詳細介紹>,發布/訂閱模式下的消費者
     *
     */
    public class Consumer1 {
        public static final String QUEUE_NAME = "fanout_queue_1";
    
        public static final String EXCHANGE_NAME = "fanout_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明交換器
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.綁定隊列到交換器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //同一時刻服務器只會發送一條消息給消費者
            channel.basicQos(1);
    
            //5.定義隊列的消費者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //5.監聽隊列,手動返回完成狀態
            channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
            //6.獲取消息
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[消費者1] received message : '" + message + "'");
                //休眠10毫秒
                Thread.sleep(10);
                //返回確認狀態
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    

    消費者2:

    package org.alva.RabbitMQ.PublishModel;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,
     * <詳細介紹>,
     *
     */
    public class Consumer2 {
        public static final String QUEUE_NAME = "fanout_queue_2";
    
        private final static String EXCHANGE_NAME = "fanout_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
            channel.basicQos(1);
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[消費者2] received message : '" + message + "'");
                Thread.sleep(1000);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    

    注意:消費者1和消費者2兩者監聽的隊列名稱是不一樣的.

  3. 測試結果

    image-20180819235254849

    image-20180819235351940

    image-20180819235425091

    消費者1和消費者2都消費了該消息.

    ps:這是因為消費者1和消費者2都監聽了被同一個交換器綁定的隊列.如果消息發送到沒有隊列綁定的交換器時,消息將丟失,因為交換器沒有存儲消息的能力,消息只能存儲在隊列.

  4. 應用場景:

    比如一個商城系統需要在管理員上傳新的商品圖片時,前台系統必須更新圖片,日志系統必須記錄相應的日志,那么就可以將兩個隊列綁定到圖片上傳交換器上,一個用於前台系統剛更新圖片,另一個用於日志系統記錄日志.

4.路由模式

生產者將消息發送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發送的消息會指定一個路由key,那么消息只會發送到相應key相同的隊列,接着監聽該隊列的消費者消費信息.

  1. 生產者

    package org.alva.RabbitMQ.DirectExchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,路由模式下的生產者
     * <詳細介紹>,
     *
     */
    public class Producer {
        private final static String EXCHANGE_NAME = "direct_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明交換器,類型為direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //4.定義消息內容
            String message = "hello rabbitmq";
            //5.發布消息
            channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());
            System.out.println("[x] send'" + message + "'");
            //6.關閉通道和連接
            channel.close();
            connection.close();
    
        }
    }
    
    
  2. 消費者

    消費者1:

    package org.alva.RabbitMQ.DirectExchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,消費者
     * <詳細介紹>,路由模式下的消費者1
     * <p>
     * 這種模式添加了一個路由鍵,生產者發布消息的時候添加路由鍵,消費者綁定隊列到交換機時添加鍵值,
     * 這樣就可以接收到需要接收的消息。
     *
     */
    public class Consumer1 {
        public static final String QUEUE_NAME = "direct_queue_1";
    
        public static final String EXCHANGE_NAME = "direct_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.綁定隊列到交換器,指定路由key為update
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "add");
            //同一時刻服務器只會發送一條消息給消費者
            channel.basicQos(1);
    
            //5.定義隊列的消費者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //5.監聽隊列,手動返回完成狀態
            channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
            //6.獲取消息
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[消費者1] received message : '" + message + "'");
                //休眠10毫秒
                Thread.sleep(10);
                //返回確認狀態
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    

    消費者2:

    package org.alva.RabbitMQ.DirectExchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,消費者
     * <詳細介紹>,路由模式下的消費者2
     *
     */
    public class Consumer2 {
        public static final String QUEUE_NAME = "direct_queue_2";
    
        public static final String EXCHANGE_NAME = "direct_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.綁定隊列到交換器,指定路由key為select
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
            //同一時刻服務器只會發送一條消息給消費者
            channel.basicQos(1);
    
            //5.定義隊列的消費者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //5.監聽隊列,手動返回完成狀態
            channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
            //6.獲取消息
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[消費者1] received message : '" + message + "'");
                //休眠10毫秒
                Thread.sleep(10);
                //返回確認狀態
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    
  3. 測試結果

    生產者發布消息,指定的路由key為update,消費者1綁定隊列和交換器時key分別是update/delete/add;消費者2綁定隊列和交換器時key時select.

    所以可以猜到生產者發送的消息,只有消費者1能夠接收並消費,而消費者2是不能接收的.

    image-20180820101737302

    image-20180820101752244

    image-20180820101805624

  4. 應用場景

    利用消費者能夠有選擇性的接收消息的特性,比如商場系統的后台管理系統對於商品進行修改、刪除、新增操作都需要更新前台系統的界面展示,而查詢操作不需要,那么這兩個隊列分開接收消息就比較好.

5.主題模式

上面的路由模式是根據路由key進行完整的匹配(完全相等才發送消息),這里的通配符模式通俗的來講就是模糊匹配.

符號"#"表示匹配一個或多個詞,符號"*"表示匹配一個詞.

  1. 生產者

    package org.alva.RabbitMQ.TopicExchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,主題模式下的生產者
     * <詳細介紹>,
     *
     */
    public class Producer {
        private final static String EXCHANGE_NAME = "topic_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明交換器,類型為direct
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            //4.定義消息內容
            String message = "hello rabbitmq";
            //5.發布消息
            channel.basicPublish(EXCHANGE_NAME, "update.Name", null, message.getBytes());
            System.out.println("[x] send'" + message + "'");
            //6.關閉通道和連接
            channel.close();
            connection.close();
    
        }
    }
    
    
  2. 消費者1

    package org.alva.RabbitMQ.TopicExchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,消費者
     * <詳細介紹>,主題模式下的消費者1
     *
     */
    public class Consumer1 {
        public static final String QUEUE_NAME = "topic_queue_1";
    
        public static final String EXCHANGE_NAME = "topic_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5673, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.綁定隊列到交換器,指定路由key為update
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update.#");
            //同一時刻服務器只會發送一條消息給消費者
            channel.basicQos(1);
    
            //5.定義隊列的消費者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //5.監聽隊列,手動返回完成狀態
            channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
            //6.獲取消息
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[消費者1] received message : '" + message + "'");
                //休眠10毫秒
                Thread.sleep(10);
                //返回確認狀態
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    
  3. 消費者2

    package org.alva.RabbitMQ.TopicExchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import org.alva.Utils.ConnectionUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <一句話描述>,消費者
     * <詳細介紹>,主題模式下的消費者2
     *
     * @author 穆國超
     * @since 設計wiki | 需求wiki
     */
    public class Consumer2 {
        public static final String QUEUE_NAME = "topic_queue_2";
    
        public static final String EXCHANGE_NAME = "topic_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.獲取連接
            Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            //2.聲明信道
            Channel channel = connection.createChannel();
            //3.聲明隊列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //4.綁定隊列到交換器,指定路由key為select
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select.#");
            //同一時刻服務器只會發送一條消息給消費者
            channel.basicQos(1);
    
            //5.定義隊列的消費者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            //5.監聽隊列,手動返回完成狀態
            channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
            //6.獲取消息
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[消費者1] received message : '" + message + "'");
                //休眠10毫秒
                Thread.sleep(1000);
                //返回確認狀態
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    
  4. 分析結果

    生產者發送消息綁定的路由key為update.Name;消費者1監聽的隊列和交換器綁定路由key為update.#;消費者2監聽的隊列和交換器綁定路由key為select.#.

    很顯然,消費者1會接收到消息,而消費者2接收不到

    image-20180902151433116

    image-20180902151448651

    image-20180902151458185

6.四種交換器

​ 前面介紹了五種隊列模式,但是實際上只有三種,第一種簡單隊列,第二種工作模式,剩下的三種都是和交換器綁定的合起來稱為一種,這節詳細介紹交換器.

​ 交換器分為四種,分別是:direct,fanout,topic和headers.

​ 前三種分別對應路由模式,發布訂閱模式和通配符模式,headers交換器允許匹配AMQP消息的header而非路由鍵,除此之外,header交換器和direct交換器完全一致,但是性能卻差很多,因此基本上不會用到該交換器,這不做詳細介紹.

  1. direct

    如果路由鍵完全匹配的話,消息才會被投放到相應的隊列.

  2. fanout

    當發送一條消息到fanout交換器上時,它會把消息投放到所有附加在此交換器的上的隊列.

  3. topic

    設置模糊的綁定方式,"*"操作符將"."視為分隔符,匹配單個字符;"#"操作符沒有分塊的概念,它將任意"."均視為關鍵字的匹配部分,能夠匹配多個字符.

7.總結

​ 關於RabbitMQ的五種隊列,其實實際使用最多的是最后一種主題模式,通過模糊匹配,使得操作更加自如.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM