RabbitMQ 五種工作模式


官網介紹:https://www.rabbitmq.com/getstarted.html

五種工作模式的主要特點

  1. 簡單模式:一個生產者,一個消費者
  2. work模式:一個生產者,多個消費者,每個消費者獲取到的消息唯一(消費者彼此競爭成為接收者)。
  3. 訂閱模式:一個生產者發送的消息會被多個消費者獲取。
  4. 路由模式:發送消息到交換機並且要指定路由key ,消費者將隊列綁定到交換機時需要指定路由key
  5. topic模式:將路由鍵和某模式進行匹配,此時隊列需要綁定在一個模式上,“#”匹配一個詞或多個詞,“*”只匹配一個詞。

簡單模式(一個生產者,一個消費者)

這種模式下不需要將Exchange進行任何綁定(binding)操作

    public static final String QUEUE_NAME= "myqueue";

    public static void test() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        //設置Virtual Host
        factory.setVirtualHost("/ld");
        factory.setUsername("ld");
        factory.setPassword("aaa");
        //通過工廠獲取連接
        Connection connection = factory.newConnection();


        //創建隊列,發送消息
        public void producer () {
            //創建通道
            Channel channel = connection.createChannel();
            //聲明創建隊列
            /** 
                隊列名
                是否持久化
                是否排外  即只允許該channel訪問該隊列   一般等於true的話用於一個隊列只能有一個消費者來消費的場景
                是否自動刪除(自動刪除的前提是: 至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會 自動刪除。)
                其他屬性
            */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //消息內容
            String message = "Hello World!";
            //發布消息
            /**
                交換機
                隊列名
                其他屬性  路由
                消息body
            */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            //關閉連接和通道
            channel.close();
            connection.close();
        }
        //消費者消費消息
        public void consumer () {
            //創建通道
            Channel channel = connection.createChannel();
            //聲明通道
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定義消費者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //監聽隊列
            //autoAck 是否自動確認消息,true自動確認
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                //這個方法會阻塞住,直到獲取到消息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("接收到消息:" + message);
            }
        }
    }

work模式(一個生產者,一個隊列,多個消費者,每個消費者獲取到的消息唯一)

    public static final String QUEUE_NAME= "myqueue";

    //消息生產者
    public void producer{
        //獲取連接和通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "";
        for (int i = 0; i < 100; i++) {
            message = "" + i;
            channel.basicPublish("",QUEUE_NAME, null, message.getBytes());
            Thread.sleep(i);
        }
        channel.close();
        connection.close();
    }


    //消費者1
    public void consumer1{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //同一時刻服務器只發送一條消息給消費端
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //false:手動確認
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("recive1:" + message);
            Thread.sleep(100);
            //消息消費完給服務器返回確認狀態,表示該消息已被消費
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

channel.basicPublish

channel.basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)

mandatory:
    true:如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,
    那么會調用basic.return方法將消息返還給生產者。

    false:出現上述情形broker會直接將消息扔掉

immediate:
    true:如果exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那么這條消息不會放入隊列中。
    當與消息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。

BasicProperties :
    需要注意的是BasicProperties.deliveryMode:
        0:不持久化 1:持久化 
    這里指的是消息的持久化,配合channel(durable=true),queue(durable)可以實現,即使服務器宕機,消息仍然保留

fanout訂閱模式(一個生產者,多個隊列,多個消費者)

這種模式需要提前將Exchange與Queue進行綁定,
一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。

​ 一個生產者發送的消息會被多個消費者獲取。

生產者:可以將消息發送到隊列或者是交換機。
消費者:只能從隊列中獲取消息。

如果消息發送到沒有隊列綁定的交換機上,那么消息將丟失。

    public static final String EXCHANGE_NAME = "exchange_fanout";

    //生產者
    public void producer() {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明交換機 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        channel.close();
        connection.close();
    }


    //消費者1
    public final static String QUEUE_NAME = "queue_fanout_1";
    public void consumer1() {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //綁定隊列到交換機上
        channel.queueBind(QUEUE_NAME, Send.EXCHANGE_NAME, "");
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

direct路由模式(完全匹配、單播的模式)

1. 發送消息到交換機並且要指定路由key

2. 消費者將隊列綁定到交換機時需要指定路由key

3. 完全匹配,只有匹配到的消費者才能消費消息

4. 一個隊列可以綁定多個路由

    public static final String EXCHANGE_NAME = "exchange_direct";

    //生產者
    public void producer() {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明交換機 direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes()); 
        channel.close();
        connection.close();
    }

    //消費者1
    public final static String QUEUE_NAME = "queue_direct_1";

    public void consumer1() {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //綁定隊列到交換機上,並制定路由鍵為"key"
        channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.routing.Send.EXCHANGE_NAME, "key");
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }

topic通配符模式

兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配一個單詞

    //生產者
    public static final String EXCHANGE_NAME = "exchange_topic";

    public void producer() {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明交換機 topic:交換機類型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println(message);
        channel.close();
        connection.close();
    }

    //消費者1
    public final static String QUEUE_NAME = "queue_topic_1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //綁定隊列到交換機上,並制定路由鍵匹配規則為"key.*"
        channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.topics.Send.EXCHANGE_NAME, "key.*");
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM